package com.gitee.Jmysy.binlog4j.core;

import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.util.TypeUtils;
import com.gitee.Jmysy.binlog4j.core.dispatcher.BinlogEventDispatcher;
import com.gitee.Jmysy.binlog4j.core.meta.ColumnMetadata;
import com.gitee.Jmysy.binlog4j.core.utils.ClassUtils;
import com.gitee.Jmysy.binlog4j.core.utils.JDBCUtils;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;

/**
 * Binlog 处理器包装类
 *
 * @author 就眠儀式
 * */
@Data
@Slf4j
public class BinlogEventHandlerInvoker<T> {

    private Map<String, List<ColumnMetadata>> columnMetadataMap = new HashMap<>();

    private IBinlogEventHandler eventHandler;

    private BinlogClientConfig clientConfig;

    private Class<T> genericClass;

    private static final ParserConfig SNAKE_CASE;

    static {
        SNAKE_CASE = new ParserConfig();
        SNAKE_CASE.propertyNamingStrategy = PropertyNamingStrategy.SnakeCase;
    }

    public void invokeInsert(long serverId, String databaseName, String tableName, List<Serializable[]> data, long timestamp) {
        if(eventHandler.isHandle(serverId,databaseName, tableName)) {
            List<ColumnMetadata> columns = getColumns(databaseName, tableName);
            BinlogEvent binlogEvent = createBinlogEvent(databaseName, tableName);
            data.forEach(row -> {
                binlogEvent.setData(toEntity(columns, row));
                binlogEvent.setColumns(columns);
                eventHandler.onInsert(serverId,binlogEvent,timestamp);
            });
        }
    }
    public void invokeDDL(long serverId, String databaseName, String tableName, String  sql, long timestamp) {
        if(null==tableName||""==tableName||"".equals(tableName)){
            eventHandler.onDDL(serverId,databaseName,tableName,sql,timestamp);
        }else {
            if(eventHandler.isHandle(serverId,databaseName, tableName)) {
                eventHandler.onDDL(serverId,databaseName,tableName,sql,timestamp);
            }
        }
    }
    public void removeMap(String databaseName, String tableName){
        try {
            String tableSchema = String.format("%s.%s", databaseName, tableName);
            columnMetadataMap.remove(tableSchema);
        }catch (Exception e){
            log.error("removeMap======================>"+e);
        }

    }

    public void invokeUpdate(long serverId, String databaseName, String tableName, List<Map.Entry<Serializable[], Serializable[]>> data, long timestamp) {
        if(eventHandler.isHandle(serverId, databaseName, tableName)) {
            List<ColumnMetadata> columns = getColumns(databaseName, tableName);
            BinlogEvent binlogEvent = createBinlogEvent(databaseName, tableName);
            data.forEach(row -> {
                binlogEvent.setData(toEntity(columns, row.getValue()));
                binlogEvent.setOriginalData(toEntity(columns, row.getKey()));
                binlogEvent.setUpdatePk(updatePk(columns,binlogEvent));
                binlogEvent.setColumns(columns);
                eventHandler.onUpdate(serverId,binlogEvent,timestamp);
            });
        }
    }

    private boolean updatePk(List<ColumnMetadata> columns, BinlogEvent binlogEvent) {
        try {
            Map<String, Object> map = columns.stream().filter(e -> e.isPrimaryColumn()).collect(Collectors.toMap(ColumnMetadata::getColumnName, ColumnMetadata::getColumnName));
            Map<String,Object> data = (Map) binlogEvent.getData();
            Map<String,Object> org = (Map) binlogEvent.getOriginalData();
            for(Map.Entry<String, Object> entry:map.entrySet()){
                if(!data.get(entry.getKey()).equals(org.get(entry.getKey()))){
                    return true;
                }
            }
        }catch (Exception e){
            log.error("updatePk==============>"+e);
        }
        return false;
    }


    public void invokeDelete(long serverId, String databaseName, String tableName, List<Serializable[]> data, long timestamp) {
        if(eventHandler.isHandle(serverId, databaseName, tableName)) {
            List<ColumnMetadata> columns = getColumns(databaseName, tableName);
            BinlogEvent binlogEvent = createBinlogEvent(databaseName, tableName);
            data.forEach(row -> {
                binlogEvent.setData(toEntity(columns, row));
                binlogEvent.setColumns(columns);
                eventHandler.onDelete(serverId,binlogEvent,timestamp);
            });
        }
    }

    private BinlogEvent createBinlogEvent(String databaseName, String tableName) {
        BinlogEvent binlogEvent = new BinlogEvent<>();
        binlogEvent.setDatabase(databaseName);
        binlogEvent.setTable(tableName);
        binlogEvent.setTimestamp(System.currentTimeMillis());
        return binlogEvent;
    }

    public List<ColumnMetadata> getColumns(String databaseName, String tableName) {
        String tableSchema = String.format("%s.%s", databaseName, tableName);
        List<ColumnMetadata> columns = columnMetadataMap.get(tableSchema);
        if (columns == null || clientConfig.isStrict()) {
            columns = JDBCUtils.getColumns(clientConfig, databaseName, tableName);
            columnMetadataMap.put(tableSchema, columns);
        }
        return columns;
    }

    @SneakyThrows
    public T toEntity(List<ColumnMetadata> columns, Serializable[] data) {
          Map<String, Object> obj = new HashMap<>(columns.size());
          for (int i = 0; i < data.length; i++) {
              ColumnMetadata column = columns.get(i);
              Serializable fieldValue = data[i];
              if (fieldValue instanceof java.util.Date) {
                  if(fieldValue != null) {
                      data[i] = new Date(((Date) fieldValue).getTime() + clientConfig.getTimeOffset());
                  }
              } else if (fieldValue instanceof byte[]) {
                  if(fieldValue != null) {
                      if (genericClass != null) {
                          Field field = ClassUtils.getDeclaredField(genericClass, column.getColumnName());
                          if(field != null) {
                              if (field.getType() == String.class) {
                                  data[i] = new String((byte[]) fieldValue, StandardCharsets.UTF_8);
                              }
                          }
                      }
                  }
              } else if (fieldValue instanceof BitSet) {
                  if(fieldValue != null) {
                      if (genericClass != null) {
                          Field field = ClassUtils.getDeclaredField(genericClass, column.getColumnName());
                          if(field != null) {
                              if (field.getType() == Boolean.class || field.getType() == boolean.class) {
                                  data[i] = !((BitSet) fieldValue).isEmpty();
                              }
                          }
                      }
                  }
              }
              obj.put(column.getColumnName(), data[i]);
          }
          if (genericClass != null) {
              return TypeUtils.cast(obj, genericClass, SNAKE_CASE);
          }
          return (T) obj;
    }

    public void setEventHandler(IBinlogEventHandler eventHandler) {
        this.eventHandler = eventHandler;
        this.genericClass = ClassUtils.getGenericType(eventHandler.getClass());
    }
}
